package org.apache.celestial.core.stream;

import org.apache.celestial.connector.ConnectorConstants;
import org.apache.celestial.connector.IConnector;
import org.apache.celestial.connector.KafkaConnector;
import org.apache.celestial.connector.SocketConnector;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Stream;

@Component
public class EnvironmentContext<T> {

    private Map<String,Object> config = new HashMap<>();

    public EnvironmentContext(){

    }

    public EnvironmentContext(Configuration configuration) {
        this.config.putAll(configuration.getConfig());
    }

    public Stream<T> createSocketConnector(Integer port) throws Exception {
        Properties props = new Properties();
        props.put(ConnectorConstants.SOCKET_PORT,port);
        IConnector<T> connector = new SocketConnector<>();
        connector.init(props);
        Supplier<T> supplier = connector::getData;
        return Stream.generate(supplier);
    }


    public Stream<String> createKafkaConnector(Properties props) throws Exception {
        IConnector<String> connector = new KafkaConnector();
        connector.init(props);
        Supplier<String> supplier = connector::getData;
        return Stream.generate(supplier);
    }

}
